package com.atuguigu.flink.Day02.window;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

// 求滚动时间窗口里面的温度的最大值和最小值，窗口长度是5s
public class Example2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);


        DataStreamSource<SendsorReading> streamSourece1 = env.addSource(new SensorSource());
        SingleOutputStreamOperator<SendsorReading> stream = streamSourece1.filter(
                r -> r.id.equals("sensor_1")
        );
        KeyedStream<Tuple3<String, Double, Double>, String> keyedStream = stream.map(
                new MapFunction<SendsorReading, Tuple3<String, Double, Double>>() {

                    @Override
                    public Tuple3<String, Double, Double> map(SendsorReading value) throws Exception {
                        return Tuple3.of(value.id, value.temperture, value.temperture);
                    }
                }
        ).keyBy(
                r -> r.f0
        );

        WindowedStream<Tuple3<String, Double, Double>, String, TimeWindow> windowedStream = keyedStream.window(
                TumblingProcessingTimeWindows.of(Time.seconds(5))
        );


        windowedStream.reduce(
                new ReduceFunction<Tuple3<String, Double, Double>>() {
                    @Override
                    public Tuple3<String, Double, Double> reduce(Tuple3<String, Double, Double> t1, Tuple3<String, Double, Double> t2) throws Exception {
                        return Tuple3.of(t1.f0,Math.min(t1.f1,t2.f1),Math.max(t1.f2,t2.f2));
                    }
                }
        ).print();


        env.execute();
    }
}
